VictorialMetrics存储原理之索引存储格式
前文我们介绍了当插入数据的时候会先去添加索引数据,索引构建完成后又是如何去持久化数据的呢?保存的数据又是怎样的格式呢?本节我们将对此进行详细讲解。
添加索引数据
索引构建完成后会调用 AddItems
函数将索引添加到 Table
中去:
// lib/mergeset/table.go
// AddItems 添加指定的 items 到 table 中去
func (tb *Table) AddItems(items [][]byte) error {
if err := tb.rawItems.addItems(tb, items); err != nil {
return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)
}
return nil
}
Table
的结构如下所示:
// lib/mergeset/table.go
// Table 代表 mergeset table.
type Table struct {
activeMerges uint64
mergesCount uint64
itemsMerged uint64
assistedMerges uint64
// merge 索引
mergeIdx uint64
// 路径
path string
// flush回调
flushCallback func()
flushCallbackWorkerWG sync.WaitGroup
needFlushCallbackCall uint32
// 在将指定项的整个块刷新到持久存储之前,在合并期间调用的回调
prepareBlock PrepareBlockCallback
// parts 列表
partsLock sync.Mutex
parts []*partWrapper
// rawItems 包含最近添加的尚未转换为 parts 的数据
// 出于性能原因,未在搜索中使用 rawItems
rawItems rawItemsShards
snapshotLock sync.RWMutex
flockF *os.File
stopCh chan struct{}
partMergersWG syncwg.WaitGroup
rawItemsFlusherWG sync.WaitGroup
convertersWG sync.WaitGroup
rawItemsPendingFlushesWG syncwg.WaitGroup
}
一个索引 Table
就对应着一个 indexDB,也就是数据目录 indexdb
下面的文件夹:
其中核心的是 parts
和 rawItems
两个属性。
parts
主要是存储 merge 后的 blocks,一个part
与文件系统上的一个目录对应,比如上图中的24_1_16F4A862471C1DC9
目录就是一个part
。rawItems
是用于预处理Items
的,是一个rawItemsShards
对象。
rawItemsShards
结构体定义如下所示:
// lib/mergeset/table.go
type rawItemsShards struct {
shardIdx uint32
// 在多 cpu 系统上添加 rows 数据时,shards 分片可以减少锁竞争
shards []rawItemsShard
}
// 每个 table 的 rawItems 分片数
var rawItemsShardsPerTable = cgroup.AvailableCPUs()
// 每个分片最大的Block数
const maxBlocksPerShard = 512
// 当在打开Table的时候就会调用该函数进行初始化
func (riss *rawItemsShards) init() {
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
}
// 添加 items 元素
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error {
n := atomic.AddUint32(&riss.shardIdx, 1)
shards := riss.shards
idx := n % uint32(len(shards))
shard := &shards[idx]
return shard.addItems(tb, items)
}
rawItemsShards
其实就是加了一个分片功能用于保存索引数据,addItems
函数就是将要添加的数据添加到对应的分片上去,最终执行的逻辑是 shard.addItems
。
// lib/mergeset/table.go
type rawItemsShard struct {
mu sync.Mutex
ibs []*inmemoryBlock
lastFlushTime uint64
}
// 添加items元素
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
var err error
var blocksToFlush []*inmemoryBlock
ris.mu.Lock()
ibs := ris.ibs
if len(ibs) == 0 {
ib := getInmemoryBlock()
ibs = append(ibs, ib)
ris.ibs = ibs
}
// 取最后一个内存块
ib := ibs[len(ibs)-1]
for _, item := range items {
// 添加索引item到内存块
if !ib.Add(item) { // 超过了内存块大小
// 重新获取一个内存块,此时肯定为空
ib = getInmemoryBlock()
// 重新添加
if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))
break
}
ibs = append(ibs, ib)
ris.ibs = ibs
}
}
// 超过了每个分片的最大内存块的数量
if len(ibs) >= maxBlocksPerShard {
// 将内存块放到待刷新的内存块列表中去
blocksToFlush = append(blocksToFlush, ibs...)
// 释放前面的内存块资源
for i := range ibs {
ibs[i] = nil
}
ris.ibs = ibs[:0]
ris.lastFlushTime = fasttime.UnixTimestamp()
}
ris.mu.Unlock()
// 执行merge合并操作
tb.mergeRawItemsBlocks(blocksToFlush, false)
return err
}
// lib/mergeset/encoding.go
// 内存中的一个Block块结构
type inmemoryBlock struct {
commonPrefix []byte
data []byte // 用来存储数据
items []Item // 用来存储每个item数据的起始偏移量
}
// Item 表示用于存储在 mergeset 中的单个 item 数据
type Item struct {
// 数据的开始偏移量
Start uint32
// 数据的结束偏移量
End uint32
}
// maxInmemoryBlockSize 是 memoryblock.data 的最大值。
//
// 它必须适合 CPU 缓存大小,即当前 CPU 的缓存大小为64kb。
const maxInmemoryBlockSize = 64 * 1024
// Add 将 x 添加到内存卡 ib 的末尾
//
// 如果由于块大小限制,x 未添加到 ib,则返回 false
func (ib *inmemoryBlock) Add(x []byte) bool {
data := ib.data
// 操过块大小限制了
if len(x)+len(data) > maxInmemoryBlockSize {
return false
}
if cap(data) == 0 {
// 预分配 data 和 items 以减少内存分配
data = make([]byte, 0, maxInmemoryBlockSize)
ib.items = make([]Item, 0, 512)
}
dataLen := len(data)
data = append(data, x...) // 将 x 添加到 data
ib.items = append(ib.items, Item{ // 更新 items
Start: uint32(dataLen),
End: uint32(len(data)),
})
ib.data = data
return true
}
rawItemsShard
表示保存索引数据的一个分片,里面其实就是一个 inmemoryBlock
的内存块切片,每个分片最多有 512 个内存块,每个内存块占用 64KB 的容量,当每个分片中的内存块数量超过最大数量(512)会去将内存块数据刷新为 Part
。
如果分片中的内存块数量没超过上限,则会通过一个任务去定时(1s)将 rawItem 数据刷新(转换)为 Part
,以便它们对搜索可见。
// lib/mergeset/table.go
// 将最近的 rawItem 刷新(转换)为 Part,以便它们对搜索可见。
const rawItemsFlushInterval = time.Second
// 启动 rawItems Flusher 任务
func (tb *Table) startRawItemsFlusher() {
tb.rawItemsFlusherWG.Add(1)
go func() {
tb.rawItemsFlusher()
tb.rawItemsFlusherWG.Done()
}()
}
func (tb *Table) rawItemsFlusher() {
ticker := time.NewTicker(rawItemsFlushInterval)
defer ticker.Stop()
for {
select {
case <-tb.stopCh:
return
case <-ticker.C:
tb.flushRawItems(false)
}
}
}
合并内存数据
将内存块数据转换为 Part
都是通过 mergeRawItemsBlocks
函数去实现的。
// lib/mergeset/table.go
// 一次合并的默认 parts 数
//
// 这个数字是根据经验得出的,它提供了尽可能低的开销
// 有关详细信息,请参阅 appendPartsToMerge test
const defaultPartsToMerge = 15
// merge 内存块数据
func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) {
if len(ibs) == 0 {
return
}
tb.partMergersWG.Add(1)
defer tb.partMergersWG.Done()
pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)
var pwsLock sync.Mutex
var wg sync.WaitGroup
for len(ibs) > 0 {
// 一次最大合并的内存块数量
n := defaultPartsToMerge
if n > len(ibs) {
n = len(ibs)
}
wg.Add(1)
go func(ibsPart []*inmemoryBlock) {
defer wg.Done()
// merge inmemoryBlock
pw := tb.mergeInmemoryBlocks(ibsPart)
if pw == nil {
return
}
pw.isInMerge = true
pwsLock.Lock()
pws = append(pws, pw)
pwsLock.Unlock()
}(ibs[:n])
ibs = ibs[n:]
}
wg.Wait()
if len(pws) > 0 {
if err := tb.mergeParts(pws, nil, true); err != nil {
logger.Panicf("FATAL: cannot merge raw parts: %s", err)
}
if tb.flushCallback != nil {
if isFinal {
tb.flushCallback()
} else {
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
}
}
}
for {
tb.partsLock.Lock()
ok := len(tb.parts) <= maxParts
tb.partsLock.Unlock()
if ok {
return
}
// The added part exceeds maxParts count. Assist with merging other parts.
//
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
err := tb.mergeExistingParts(false)
storagepacelimiter.Search.Dec()
if err == nil {
atomic.AddUint64(&tb.assistedMerges, 1)
continue
}
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
return
}
logger.Panicf("FATAL: cannot merge small parts: %s", err)
}
}
mergeRawItemsBlocks
函数将指定的内存块进行 merge 合并操作,一次合并最大的内存块数量为 15,然后在独立的 goroutine 中去进行合并操作,使用 mergeInmemoryBlocks
函数。
// lib/mergeset/table.go
// merge InmemoryBlocks
func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
// 将 InmemoryBlock 列表转换成 inmemoryPart 列表
// inmemoryPart 表示内存中的Part
mps := make([]*inmemoryPart, 0, len(ibs))
for _, ib := range ibs {
if len(ib.items) == 0 {
continue
}
mp := getInmemoryPart()
mp.Init(ib) // 将inmemoryBlock转换为inmemoryPart
putInmemoryBlock(ib)
mps = append(mps, mp)
}
if len(mps) == 0 {
return nil
}
if len(mps) == 1 {
// 没有要合并的内容。只需返回单个 inmemory part。
mp := mps[0]
p := mp.NewPart()
return &partWrapper{
p: p,
mp: mp,
refCount: 1,
}
}
defer func() {
for _, mp := range mps {
putInmemoryPart(mp)
}
}()
atomic.AddUint64(&tb.mergesCount, 1)
atomic.AddUint64(&tb.activeMerges, 1)
defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))
// 为每个 `inmemoryPart` 构造 `blockStreamReader`, 用于迭代读取 items
bsrs := make([]*blockStreamReader, 0, len(mps))
for _, mp := range mps {
bsr := getBlockStreamReader()
bsr.InitFromInmemoryPart(mp)
bsrs = append(bsrs, bsr)
}
// 准备一个 blockStreamWriter 用于合并写入的 part
bsw := getBlockStreamWriter()
// 不要通过 getInmemoryPart() 获取 mpDst,因为与池中的其他条目相比,它的大小可能太大。
// 这可能会导致内存使用量增加,因为存在大量的碎片。
// 创建一个新的 inmemoryPart,接收合并的数据
mpDst := &inmemoryPart{}
bsw.InitFromInmemoryPart(mpDst)
// 开始 merge 数据
// 该 merge 不应该被 stopCh 中断,因为它可能是 stopCh 关闭后的最终结果
err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged)
if err != nil {
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
}
putBlockStreamWriter(bsw)
for _, bsr := range bsrs {
putBlockStreamReader(bsr)
}
p := mpDst.NewPart()
return &partWrapper{
p: p,
mp: mpDst,
refCount: 1,
}
}
上面的函数会将指定的内存块转换成 partWrapper
,该结构就是一个包含 part
和 inmemoryPart
的包装器。
// lib/mergeset/table.go
type partWrapper struct {
p *part
mp *inmemoryPart
refCount uint64
isInMerge bool
}
part
的结构如下所示:
// lib/mergeset/part.go
type part struct {
ph partHeader
path string
size uint64
mrs []metaindexRow
indexFile fs.MustReadAtCloser
itemsFile fs.MustReadAtCloser
lensFile fs.MustReadAtCloser
}
一个 part
就是 Table
下面的一个数据目录。
part
中包含一个 partHeader
,该属性中包含当前 part
的一些 Meta 信息,一共有多少个 items、有多少 blocks、第一个和最后一个 item,对应着 part
目录下面的 metadata.json
文件。
// lib/mergeset/part_header.go
type partHeader struct {
// part 包含的 items 数
itemsCount uint64
// part 包含的 blocks 数
blocksCount uint64
// part 中的第一个 item
firstItem []byte
// part 中的最后一个 item
lastItem []byte
}
part
中另外的属性 path
表示当前 part
的路径,size
表示大小,另外三个属性 indexFile
、itemsFile
、lensFile
对应中 part
目录下面的三个文件:index.bin
、items.bin
、lens.bin
。此外 part
结构中还有最后一个 mrs
属性,是一个 []metaindexRow
。
// lib/mergeset/metaindex_row.go
// metaindexRow 描述了一个 blockHeaders 即索引块。
type metaindexRow struct {
// 第一个 block 中的第一个 item 元素
// 它用于快速查找所需的索引块
firstItem []byte
// 块包含的 blockHeaders 的数量
blockHeadersCount uint32
// 索引文件中块的偏移量
indexBlockOffset uint64
// 索引文件中块的大小
indexBlockSize uint32
}
除了 part
之外还有一个内存中的 inmemoryPart
结构,其基本结构和 part
类似,不同的是几个相关的属性不是文件对象,而是 ByteBuffer
,因为是内存中的结构。
// lib/mergeset/inmemory_part.go
// 在内存中的 Part 结构
type inmemoryPart struct {
// partHeader 记录 itemsCount, blocksCount, firstItem, lastItem 信息, 最后会序列化到 metadata.json
ph partHeader
// 当前 block 的 header 信息,有 commonPrefix, firstItem, marshalType, itemsCount, itemsBlockOffset, lenBlockOffset, itemsBlockSize, lenBlockSize
bh blockHeader
// 当前 block 的 metaindex 信息,存储了当前 blockHeader 的 firstItem, blockHeaderCount, indexBlockOffset, indexBlockSize
mr metaindexRow
// 用于序列化后写入内存/磁盘文件使用
metaindexData bytesutil.ByteBuffer // -> metaindex.bin
indexData bytesutil.ByteBuffer // -> index.bin
itemsData bytesutil.ByteBuffer // -> items.bin
lensData bytesutil.ByteBuffer // -> lens.bin
}
其他几个属性上面介绍过,blockHeader
结构如下所示,用于记录 block 头信息:
// lib/mergeset/block_header.go
type blockHeader struct {
// 块中所有 items 的公用前缀
commonPrefix []byte
// 第一个 item
firstItem []byte
// 用于块压缩的 Marshal 类型
marshalType marshalType
// 块中的 items 数,不包括第一个 item
itemsCount uint32
// items block 的偏移量
itemsBlockOffset uint64
// lens block 的偏移量
lensBlockOffset uint64
// items block 的大小
itemsBlockSize uint32
// lens block 的大小
lensBlockSize uint32
}
整个 part
的结构看上去确实比较复杂,为什么需要设计这些属性?核心肯定就是为了快速索引,我们先往下分析,待会再回过头来看。
inmemoryPart
是 part
读入内存中的结构, 在 inmemoryBlock
merge 之前,每个 inmemoryBlock
都会先通过 mp.Init
转换成一个 inmemoryPart
的结构,inmemoryPart
中 metaindexData
、indexData
、itemsData
、lensData
数据结构与磁盘对应的文件内容一致。
序列化数据
现在我们再回到上面的 mergeInmemoryBlocks
函数,流程如下所示:
1.将所有的 inmemoryBlock
转换为inmemoryPart
结构2.为每个 inmemoryPart
构造blockStreamReader
,用于迭代读取 items3.创建一个新的 inmemoryPart
,并构造一个blockSteamWriter
用于合并写入的数据4.然后调用 mergeBlockStreams
函数执行真正的 merge 操作
首先通过 Init
函数将 inmemoryBlock
转换为 inmemoryPart
结构。
// lib/mergeset/inmemory_part.go
// Init 初始化 mp 从 ib.
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
mp.Reset()
sb := &storageBlock{}
sb.itemsData = mp.itemsData.B[:0]
sb.lensData = mp.lensData.B[:0]
// 使用尽可能小的压缩等级来压缩 inmemoryPart,因为它很快就会被合并到文件 part 去。
compressLevel := -5
// 序列化乱序的数据
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
// 获取 partHeader 值
mp.ph.itemsCount = uint64(len(ib.items))
mp.ph.blocksCount = 1
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
// 获取itemsData,更新blockHeader的items偏移和数量
mp.itemsData.B = sb.itemsData
mp.bh.itemsBlockOffset = 0
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
// 获取lensData,更新blockHeader的lens偏移和数量
mp.lensData.B = sb.lensData
mp.bh.lensBlockOffset = 0
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
// 获取 indexData,blockHeader序列化的值
bb := inmemoryPartBytePool.Get()
bb.B = mp.bh.Marshal(bb.B[:0])
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, 0)
// 获取 metaindexData,metaindexRow序列化的值
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
mp.mr.blockHeadersCount = 1
mp.mr.indexBlockOffset = 0
mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
bb.B = mp.mr.Marshal(bb.B[:0])
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, 0)
inmemoryPartBytePool.Put(bb)
}
上面的函数将 inmemoryBlock
转换成 inmemoryPart
,首先会通过一个 MarshalUnsortedData
函数来序列化未排序的数据。
// MarshalUnsortedData 序列化未排序的 items 从 ib 到 sb.
//
// It also:
// - 将第一个 item 追加到 firstItemDst 并返回结果
// - 将所有 item 的公共前缀附加到 commonPrefixDst 并返回结果
// - 返回包含第一个 item 的编码项的数量
// - 返回用于编码的 marshal 类型
func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if !ib.isSorted() {
sort.Sort(ib) // 排序
}
// 更新内存块的公共前缀
ib.updateCommonPrefix()
// 序列化数据
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}
上面的序列化函数中首先会对未排序的数据进行排序,然后更新内存块的公共前缀:
// lib/mergeset/encoding.go
// 更新公共前缀
func (ib *inmemoryBlock) updateCommonPrefix() {
ib.commonPrefix = ib.commonPrefix[:0] // 公共前缀
if len(ib.items) == 0 {
return
}
items := ib.items // 数据前后位置
data := ib.data // 数据
cp := items[0].Bytes(data) // 第一段数据
if len(cp) == 0 {
return
}
for _, it := range items[1:] { // 后面的数据
// 计算公共前缀的长度
cpLen := commonPrefixLen(cp, it.Bytes(data))
if cpLen == 0 {
return
}
// 截取公共前缀数据
cp = cp[:cpLen]
}
// 设置内存块的公共前缀
ib.commonPrefix = append(ib.commonPrefix[:0], cp...)
}
公共前缀就是把每段数据包含的共同前缀提取出来,这样存储的时候后面就可以不需要存储共同的部分了,减少存储空间。
公共前缀提取出来后,接下来调用 marshalData
函数去序列化数据。
// lib/mergeset/encoding.go
// 前提条件:
// - ib.items 必须排序
// - updateCommonPrefix 必须被调用
// 序列化数据
func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
......
// 拷贝 inmemoryBlock 数据块的 firstItem(排序后的第一条数据)
data := ib.data // 内存块数据
firstItem := ib.items[0].Bytes(data) // 第一条数据
firstItemDst = append(firstItemDst, firstItem...)
// 最大公共前缀
commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
// 内存块数据小于2段或(数据大小-公共前缀长度*数据段大小 < 64) 则定义为小块
if len(data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {
// 对small block使用普通序列化,因为它更便宜
ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain
}
bbItems := bbPool.Get()
bItems := bbItems.B[:0] // 保存目的 items 数据的内存 buffer
bbLens := bbPool.Get()
bLens := bbLens.B[:0] // 保存目的 lens 数据的内存buffer
// 序列化 items 数据
// 第一项数据不需要存储,所以获取的 Uint64s 大小要减1
xs := encoding.GetUint64s(len(ib.items) - 1)
defer encoding.PutUint64s(xs)
cpLen := len(ib.commonPrefix) // 公共前缀的长度
prevItem := firstItem[cpLen:] // 第一项数据(排除公共前缀)
prevPrefixLen := uint64(0)
// 从第二个元素开始遍历(第一个 firstItem 单独存储)
for i, it := range ib.items[1:] {
// 偏移到公共前缀之后的位置
it.Start += uint32(cpLen)
// Bytes(data) 得到的数据不包含公共前缀的部分
item := it.Bytes(data)
// 计算第 N 项和 N-1 项的公共前缀长度
prefixLen := uint64(commonPrefixLen(prevItem, item))
// 仅仅只把差异的部分拷贝到目的buffer
bItems = append(bItems, item[prefixLen:]...)
// 第一次,与0异或,还是等于原值。异或后,两个整数值前面相同的部分都为0了,数值变得更短,能够便于压缩。
xLen := prefixLen ^ prevPrefixLen
// 上次的除去公共前缀的item
prevItem = item
// 上次计算得到的公共前缀长度
prevPrefixLen = prefixLen
xs.A[i] = xLen // 异或后的公共前缀值
}
// 对N-1个长度进行序列化(将uint64数组序列化成byte数组)
bLens = encoding.MarshalVarUint64s(bLens, xs.A)
// 将items数据(只有差异的部分)ZSTD压缩后,写入storageBlock
sb.itemsData = encoding.CompressZSTDLevel(sb.itemsData[:0], bItems, compressLevel)
bbItems.B = bItems
bbPool.Put(bbItems)
// 序列化 lens 数据
// 第一项数据大小(排除公共前缀)
prevItemLen := uint64(len(firstItem) - cpLen)
for i, it := range ib.items[1:] { // 从第二个元素开始遍历
// item长度 = End-Start-公共前缀大小
itemLen := uint64(int(it.End-it.Start) - cpLen)
// 与前面一个元素长度异或
xLen := itemLen ^ prevItemLen
// 上次去除公共前缀的长度
prevItemLen = itemLen
xs.A[i] = xLen // 异或后的元素长度
}
// 前面记录的是两两相对的长度,这里记录的是数据的真实长度
// 长度信息包含两种,相对长度和总长度
bLens = encoding.MarshalVarUint64s(bLens, xs.A)
// 将lens数据进行ZSTD压缩后,写入storageBlock
sb.lensData = encoding.CompressZSTDLevel(sb.lensData[:0], bLens, compressLevel)
bbLens.B = bLens
bbPool.Put(bbLens)
// 如果压缩不到90%则选择不压缩
if float64(len(sb.itemsData)) > 0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items)) {
// 压缩率不高的时候,选择不压缩
ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain
}
// 很好的压缩率
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
}
上面的序列化函数看上去比较复杂,实际上核心的一点就是想办法尽可能减少存储空间。首先将数据块的第一个数据拷贝出来放入 firstItemDst
,然后后面就从第二个元素开始去循环处理,首先计算第 N
项和 N-1
项的公共前缀长度,然后将差异的数据部分保存起来,为了能够反序列化回数据,还需要将两两之间公共前缀的长度保存下来,为了能够便于压缩,使用异或的方式来计算两两之间的公共前缀长度值。
循环计算后,将保存的两两之间的公共前缀长度进行序列化,下面的函数将一个 uint64
类型的切片转换成字节切片,如果数据小于 128 直接转换即可,如果大于 127 则用一个 7bit 来表示数值的内容,最高位后面的一个字节用来表示长度,这样就可以用变长长度来序列化数值,而不是每个数值都占用固定的长度。
// lib/encoding/int.go
// 将uint64切片转成字节切片
func MarshalVarUint64s(dst []byte, us []uint64) []byte {
for _, u := range us {
if u < 0x80 { // 小于128,直接加入到 dst,能直接存到 byte 中去
// Fast path
dst = append(dst, byte(u))
continue
}
for u > 0x7f { // 大于127,则超过的部分保留为 0x80,低位右移7位继续计算
dst = append(dst, 0x80|byte(u))
u >>= 7
}
dst = append(dst, byte(u))
}
return dst
}
长度数据序列化后,将 items 数据(只有差异的部分)进行 ZSTD
压缩后,写入 storageBlock。
只记录两两之间的公共前缀长度还不够,还需要记录数据的真实长度,最后同样再将 lens 数据进行 ZSTD
压缩后,写入 storageBlock。
如果最后的结果压缩不到 90% 则选择不压缩,不压缩则使用 marshalDataPlain
函数进行序列化:
// lib/mergeset/encoding.go
// 普通序列化数据
func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {
data := ib.data
// 序列化 items 数据
// 不需要序列化第一项数据,因为它会在 marshalData 中返回给调用者。
cpLen := len(ib.commonPrefix) // 公共前缀长度
b := sb.itemsData[:0]
for _, it := range ib.items[1:] { // 第一项之后的数据
it.Start += uint32(cpLen) // 跳过公共前缀
b = append(b, it.String(data)...) // 添加移出公共前缀的数据
}
sb.itemsData = b // itemsData数据
// 序列化 lens 数据
b = sb.lensData[:0]
for _, it := range ib.items[1:] { // 第一项之后的数据
// 原始的End-Start-公共前缀长度
b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen))
}
sb.lensData = b
}
经过上面的序列化过后就可以得到第一个数据、公共前缀、items 个数以及序列化类型,然后将这些数据存入 blockHeader
中去,后面就是一些比较简单的常规操作。
转换成 inmemoryPart
后,再包装成 blockStreamReader
,创建一个新的 inmemoryPart
,并构造一个 blockSteamWriter
用于合并写入的数据,然后调用 mergeBlockStreams
函数执行真正的 merge 操作。
// lib/mergeset/merge.go
// mergeBlockStreams 合并 bsrs 并将结果写入 bsw
//
// 也填充了 ph
//
// prepareBlock 是可选的
//
// 当 stopCh 关闭时,该函数立即返回
//
// 它还以原子方式将合并的 items 添加到 itemsMerged
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
itemsMerged *uint64) error {
// 将多个 blockStreamReader 构造成一个 blockStreamMerger 结构
bsm := bsmPool.Get().(*blockStreamMerger)
if err := bsm.Init(bsrs, prepareBlock); err != nil {
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
}
err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
bsm.reset()
bsmPool.Put(bsm)
bsw.MustClose()
if err == nil {
return nil
}
return fmt.Errorf("cannot merge %d block streams: %s: %w", len(bsrs), bsrs, err)
}
首先把多个 blockStreamReader
构造成一个 blockStreamMerger
结构, merger 里面主要是一个 bsrHeap
堆用于维护 bsrs
,用于 merge 数据时的排序。首先通过 merger 的 Init
函数构造堆排序的结构,然后核心是调用 merger 的 Merge
函数进行处理。
// lib/mergeset/merge.go
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {
again:
if len(bsm.bsrHeap) == 0 {
// 将最后的 inmemoryBlock(可能不完整)写入 bsw
bsm.flushIB(bsw, ph, itemsMerged)
return nil
}
select {
case <-stopCh:
return errForciblyStopped
default:
}
// 取出 blockStreamReader
bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader)
var nextItem []byte // 下一个 blockStreamReader
hasNextItem := false
if len(bsm.bsrHeap) > 0 {
nextItem = bsm.bsrHeap[0].bh.firstItem
hasNextItem = true
}
items := bsr.Block.items
data := bsr.Block.data
// 循环所有的 items
for bsr.blockItemIdx < len(bsr.Block.items) {
item := items[bsr.blockItemIdx].Bytes(data)
if hasNextItem && string(item) > string(nextItem) {
break
}
// 添加元素
if !bsm.ib.Add(item) {
// bsm.ib 已满,将其刷新到 bsw 并继续
bsm.flushIB(bsw, ph, itemsMerged)
continue
}
bsr.blockItemIdx++
}
if bsr.blockItemIdx == len(bsr.Block.items) {
// bsr.Block 已完全读取,处理下一个 block
if bsr.Next() {
heap.Push(&bsm.bsrHeap, bsr)
goto again
}
if err := bsr.Error(); err != nil {
return fmt.Errorf("cannot read storageBlock: %w", err)
}
goto again
}
// bsr.Block 中的下一个 item 超过了 nextItem
// 调整 bsr.bh.firstItem 并将 bsr 返回到堆
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)
heap.Push(&bsm.bsrHeap, bsr)
goto again
}
这里主要解决的问题是多个有序的字节数组(inmemoryPart),按照字节序排序,合成一个 inmemoryPart
的过程,在 merge 的过程中,每 64KB 会单独创建一个 blockHeader
,用于快速索引该 block 里面的 Items。
持久化数据
最后重复上面的过程,将 n
个 inmemoryBlock
合并成 (n-1)/defaultPartsToMerge+1
个 inmemoryPart
,最后再调用 mergeParts
函数完成索引持久化操作,持久化后生成的索引 part,主要包含 metaindex.bin
、index.bin
、lens.bin
、items.bin
、metadata.json
等 5 个文件。
这几个文件的关系如下图所示, metaindex.bin
文件通过 metaindexRow
索引 index.bin
文件,index.bin
文件通过 indexBlock
中的 blockHeader
同时索引 items.bin
文件和 items.bin
文件。
metaindex.bin:文件包含一系列的 metaindexRow
数据,每个 metaindexRow
中包含第一条数据 firstItem
、索引块包含的块头部数 blockHeadersCount
、索引块偏移 indexBlockOffset
以及索引块大小 indexBlockSize
。
metaindexRow
在文件中按照firstItem
的大小的字典序排序存储,以支持二分查找metaindex.bin
文件使用 ZSTD 进行压缩metaindex.bin
文件中的内容在 part 打开时,会全部读出加载至内存中,以加速查询过滤metaindexRow
包含的firstItem
为其索引的indexBlock
中所有blockHeader
中字典序最小的firstItem
查找时根据 firstItem
进行二分检索
index.bin:文件中包含一系列的 indexBlock
, 每个 indexBlock
又包含一系列 blockHeader
,每个 blockHeader
包含 item 的公共前缀 commonPrefix
、第一项数据 firstItem
、itemsData
的序列化类型 marshalType
、itemsData
包含的 item 数、item 块的偏移 itemsBlockOffset
等内容,就是前面使用将 inmemoryBlock
转换为 inmemoryPart
结构的 Init
函数得到的。
每个 indexBlock
使用ZSTD
压缩算法进行压缩在 indexBlock
中查找时,根据firstItem
进行二分检索blockHeader
items.bin 文件中,包含一系列的 itemsData
, 每个 itemsData
又包含一系列的 Item。
itemsData
会视情况而定来是否使用 ZTSD 压缩,当 item 个数小于 2 时,或者itemsData
的长度小于 64 字节时,不压缩;当itemsData
使用 ZSTD 压缩后的压缩率大于90%的时候也不压缩每个 item 在存储时,去掉了 blockHeader
中的公共前缀commonPrefix
以提高压缩率
lens.bin 文件中,包含一系列的 lensData
, 每个 lensData
又包含一系列 8 字节的长度 len, 长度 len 标识 items.bin
文件中对应 item 的长度。在读取或者需要解析 itemsData
中的 item 时,先要读取对应的 lensData
中对应的长度 len。 当 itemsData
进行压缩时,lensData
会先使用异或算法进行压缩,然后再使用 ZSTD 算法进一步压缩。
到这里我们就了解了索引数据是实现和存储原理了,那么真正的指标数据又是如何去存储的呢?未完待续......